(レポート) BDT303: Netflix ビックデータプラットフォーム上のSpark and Prestoの運用 #reinvent
本セッションでは、Netflixを支えるデータ分析基盤である Amazon EMR の Spark と Prestoの運用について解説しています。10/14(水) AWS Black Belt Tech Webinar 2015 ~ AWS re:Invent 2015 SA 座談会 ~ では、ADSJの現役SAが記憶に残ったおすすめの2つに選ばれた珠玉のセッションのレポートです。
Netflix (ネットフリックス)(英: Netflix, Inc.、NASDAQ: NFLX)は、アメリカ合衆国のオンラインDVDレンタル及び映像ストリーミング配信事業会社で、日本でも2015年9月1日夜からストリーミングのみですが、サービスが開始されたことが話題になりました。
BDT303 Running Spark and Presto on the Netflix Big Data Platform
このセッション
- ビッグデータの規模
- アーキテクチャ
- Presto と Spark の
- ユースケース
- パフォーマンス
- 質問の投稿
- Amazon EMR のデプロイ
- Netflixのインフラストラクチャとのインテグレーション
通知されるデータは最適解を見つけるためのデータとして使用されます。Netflixのビックデータは全ての製品のアイディアに大きな影響を及ぼし、全社的に分析やETLを推進しています。
プロダクトマネージャーが利用するA/Bテストの分析で使用するアプリケーションです。対話的に必要に応じNetflixの画面から累積保持数やストリーミング時間を参照して、異なるテスト全体、異なる地域、異なるデバイス間で落ちたことがわかります。データパワーはこのアプリケーションが稼働してETLプロセスによって生成されます。
私達の一日の最大の関心事は本当にユーザーがストリーミングを見れているかであり、また、全ての番組のストリーミング速度を、異なる国々や異なるISP間などを横断的に、毎月ダッシュボードを公開しています。そうしないとストリーミングがうまく見れない理由がわからないからです。
リコメンデーションエンジンや、検索インデックスを構築に役立ちました
最大の課題「スケール」
Netflixの3大サービスがあり、AWS、Atlas、Cassandraです。
Netflixのビジネスの指標
- 6500万人以上の会員
- 50以上の国々
- 1000以上のデバイスサポート
- 1000億時間/quater
ビックデータの規模は
- 合計 25PB以上の DW (Amazon S3)
- 読込み 10%以上 DW daily
- 書込み 10%以上 毎日データの読み込み
- 日ごとに55000億イベント/日
- 350のアクティブプラットフォームのユーザー
世界進出
- 2016年には200カ国
アーキテクチャの全体像
アプリケーションで発生するイベントのストリームデータは、まずKafkaに蓄え、内部データパイプラインツールのUrsulaを使ってS3に記録する。一方、ディメンジョンデータは、Cassandraに保存し、SS Tablesを経由して、CassandraのバルクDataPipeline出力ツール aegisthus を使ってS3に保存します。
イベントデータは15分という短いサイクル、ディメンジョンデータは1日という比較的長い単位で、データの利用目的が異なります。
システムを構成するコンポーネントは上記のように分類されます。
ビックデータ処理のニーズは異なります。例えば、インタラクティブなデータ探索は presto であり、RT analytics(リアルタイム分析)や Iterative ML-Based Algorithm(反復的アルゴリズムによる機械学習)などにはSparkがニーズにマッチしています。
DW としての Amazon S3
本当の単一ソースとしてのS3
- 99.999999999%の耐久性と99.99%の可用性のために設計
- コンピュートとストレージの分離
- 鍵有効化
- 複数の異種のクラスタ
- r/b deploymentを経由して容易にアップグレード
- 並列異種のクラスタ
- r/b deploymentを経由を簡単にアップグレード可能
パフォーマンスについてはどうか?
- S3はクラスタよりもはるかに大きいクラスタ
- クラスタからのネットワーク負荷をオフロード
- 読み取り性能
- シングルステージは、読み取り専用のジョブが5〜10%の影響を与えています
- 無意味な一連の段階にわたって償却するとき
- 書き込みパフォーマンス
- S3は結果整合性(最終的に一貫性)であるので、高速になります
presto
prestoはギガバイトからペタバイトまでのあらゆる規模のデータソースに対して、対話型の分析クエリを実行するためのオープンソースの分散SQLクエリエンジンです。
なぜPrestoが大好きなの?
- Hadoop フレンドリー - Hive metastoreとの統合
- AWS上でうまく動作 - Amazon S3のと容易に統合
- スケーラブル - ペタバイト規模での動作
- ユーザーフレンドリー - ANSI SQL
- オープンソース - そして、Javaで実装されていること
- 速い
使用状況の統計
3500クエリー/日以上
Prestoのユースケースを広がり
- データ探索と実験
- データの妥当性検証
- 私たちのインタラクティブなA / Bテストの分析アプリケーションのバックエンド
- 報告
- (まだ?)ETLはありません
Presto のデプロイ
デプロイ
- Version 0.114
- + patcheの適用
- + 非パブリックなpatcheの適用 (Parquet vectorized read integration)
- Amazon EMR の bootstrap action を介してデプロイされる
- Hadoop YARN clusterからクラスタを分離
- Hadoop services を使わない
- クラスタ管理のAmazon EMRを活用
2つの製品クラスタ
- リソースの分離
- 暫定的なクラスタ
- 1つのコーディネーター(r3.4xl) + 225のワーカー(r3.4xl)
- 専用のアプリケーションクラスタ
- 1つのコーディネーター(r3.4xl) + 4つのワーカー+ 動的ワーカー(r3.4xl)
- Netflix spinnaker APIによる動的ワーカーのサイジング
動的クラスタのサイジング
Netflixインフラと統合
Prestoへの貢献
Parquet
- カラムナファイルフォーマット
- Hive, Pig, Presto, Spark 全てにサポート
- 異なる処理エンジン全体のパフォーマンス上の利点
- S3上で良好なパフォーマンス
- DWの多数は、Parquetファイルフォーマット
Vectorized Read
- Parquet: 行単位の代わりに、バッチでカラムチャンクを読む
- Presto: ParquetHiveRecordCursorをParquetPageSourceに入れ替え
- Prestoは最大2倍のパフォーマンスの向上
- Spark, Hive, Drillなどにとって有益
- 私たちはPrestoパッチをパブリッシュできる前に、ペンディング中の parquet-131をコミット
Predicate Pushdown
例:SELECT ... WHERE abtest_id = 10;
- データはPredicate column によってデータがクラスタ化されている場合に最もよく機能する
- メタデータのオーバーヘッドなしでHiveパーティションのようなアーカイブデータを剪定する
- また、Spark, Hive, Pigで実装されている
Atlas Analytics のユースケース
- 例のクエリは:その日のGenieからの4xxエラーを分析
- 述語としてアプリケーション名とメトリック名のための高カーディナリティ/選択性
- データは、Predicate columnによって段階化かつクラスタ化される
乞うご期待
- techblog.netflix.com上の2つの今後のブログ記事
- Parquet usage @ Netflixのビッグデータプラットフォーム
- Presto + Parquet の最適化とパフォーマンス
Spark
なぜSpark?
- バッチジョブ (Pig, Hive)
- ETL ジョブ
- レポートとその他の分析
- 対話的ジョブ(Presto)
- 反復的機械学習ジョブ(Spark)
- プログラムによるユースケース
Deployments @ Netflix
- Spark on Mesos
- Self-serving AMI
- Full BDAS (Berkeley Data Analytics Stack)
- Online streaming analytics
- Spark on YARN
- Spark as a service
- YARN application on Amazon EMR Hadoop
- Offline batch analytics
バージョンサポート
Multi-tenancy
YARNによる動的アロケーション
- リソース使用率の最適化
- Harness cluster のスケール
- それでも、対話型のパフォーマンスを提供する
Task Execution Model
Dynamic Allocation [SPARK-6954]
Cached Data
- Spark はデータをキャッシュすることを許す
- データセットのインタラクティブな再利用
- 反復な利用(機械学習)
- ダイナミックアロケーション
- 何の作業が保留されていないときのexecutorsを削除
Cached Executor Timeout [SPARK-7955]
val data = sqlContext .table("dse.admin_genie_job_d”) .filter($"dateint">=20150601 and $"dateint"<=20150830) data.persist data.count
Preemption [SPARK-8167]
- 現象
- Spark Task は “executor lost”エラーとともにランダムに失敗する
- 原因
- YARNの優先処理は優れていない
- 解決策
- 優先処理のタスクが失敗としてカウントされるべきではなく、再試行する必要がある
Reading / Processing / Writing
Amazon S3のリストの最適化
- 問題:メタデータがビッグデータであり、
- パーティションの数百万を持つテーブル
- ファイルそれぞれの何百ものパーティション
- クライアントは、ジョブを起動するのに長い時間がかかる
Input split computation
- mapreduce.input.fileinputformat.list-status.num-threads
- スレッドの数はリストを使用して指定された入力パスのブロックの位置を取得する
- Spark jobでこのプロパティを設定する助けにはならない
パーティション表のファイル一覧
SPARK-9926, SPARK-10340
- 現象
- Amazon S3の上のパーティションHiveテーブルの入力分割計算(Input split computation)が遅い
- 原因
- パーティションごとのファイルをリストが遅い
- S3Nファイルシステムはデータの局所性ヒントを計算
- 解決策
- AmazonS3Clientを使用して並列にバルクリストパーティション
- Amazon S3のオブジェクトの入力分割計算(Input split computation)をバイパスする
Amazon S3 Bulk Listing
パフォーマンスの向上
SELECT * FROM nccp_log WHERE dateint=20150801 and hour=0 LIMIT 10;
Hadoop Output Committer
- どのように動作する
- 各タスクは一時ディレクトリに出力を書き込む
- 出力コミッターは最終出力先にとっての最初に成功したタスクの一時ディレクトリの名前を変更
- Amazon S3のと課題
- Amazon S3の名の変更は、コピーして(非アトミック)を削除で
- Amazon S3のは、結果整合性です
Amazon S3 Output Committer
- どのように動作する
- 各タスクはローカルディスクに出力を書き込む
- 出力コミッターはAmazon S3に最初に成功したタスクの出力をコピーする
- メリット
- 冗長なAmazon S3のコピーを避ける
- 結果整合性を避ける
- 常に新しいパスへの書き込む
Our Contributions
SPARK-6018, SPARK-6662, SPARK-6909, SPARK-6910, SPARK-7037, SPARK-7451,SPARK-7850,SPARK-8355, SPARK-8572, SPARK-8908, SPARK-9270, SPARK-9926, SPARK-10001, SPARK-10340
Next Steps for Netflix Integration
- Metrics
- Data lineage
- Parquet integration
Key Takeaways
- 本当のDWソースはAmazon S3上にある
- Amazon EMR上でカスタムのPrestoとSparkディストリビューションを実行させる
- スタンドアローンクラスタのPresto
- Hadoop YARN クラスタ上のSpark 共同テナント
- 私たちはオープンソースにコミットされ、私が実行しているものをあなたも実行できます
今後の展望
スケーリングおよびインフラストラクチャの最適化について
Amazon EMR の 上品なシュリンク と Amazon EMR における異種インスタンスグループ と Netflix Atlas metrics と Netflix Spinnaker API は、 Load-based expand/shrink of Hadoop YARN clusters
Presto と Spark で...
新しいPrestoのユースケースを拡大
NetflixのビッグデータプラットフォームでSparkを統合
ETLのユースケースのSparkを探索
まとめ
本日ご紹介した事例では、Prestoのクエリーの結果が返るまでの時間はHiveよりも圧倒的に速いことが紹介されていました。Prestoはペタバイト規模での動作やANSI SQLが使えることから、対話型の分析クエリを実行する目的で利用していました。一方、SparkはETLジョブ、反復的機械学習ジョブ(Spark)、プログラムによるユースケースと、相互補完的にユースケースを分けています。EMR3系では、PrestoやSparkはサポートしていなかったので、bootstrapactionを入れて導入していますが、最新のEMR4.1ではともに導入済みです。
EMRの強みはウェブのアクセスログといった非構造化データを構造化データに変換できることです。その過程でデータのバリデーションや変換、ETLなどを行い、場合によっては機械学習も行います。EMR4.0ではSpark、EMR4.1ではPrestoが標準でサポートされましたので、今回のプラクティスをカジュアルにお試しいただき、ぜひ本番投入をご検討していただけたらと思います。